-
Notifications
You must be signed in to change notification settings - Fork 3.3k
feat(ingest/kafka-connect): Kafka connect infer lineage from DataHub #15234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
❌ 1 Tests Failed:
View the top 1 failed test(s) by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impressive docs!
| # Schema resolver configuration for enhanced lineage | ||
| use_schema_resolver: bool = Field( | ||
| default=False, | ||
| description="Use DataHub's schema metadata to enhance CDC connector lineage. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would avoid introducing CDC as a new term to refer to Kafka Connect
CDC connector --> Kafka Connector
CDC sources/sinks -> Kafka Connect sources/sinks
| It's used when connectors don't have table.include.list configured, meaning they | ||
| capture ALL tables from the database. | ||
| The method first tries to use cached URNs from SchemaResolver (populated from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
populated from previous ingestion runs? 🤔
| # Use graph.get_urns_by_filter() to get all datasets for this platform | ||
| # This is more efficient than a search query and uses the proper filtering API | ||
| all_urns = set( | ||
| self.schema_resolver.graph.get_urns_by_filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the schema resolver here
all_urns = self.schema_resolver.get_urns()
`` `
* Is this Schema Resolver instance instantiated with the platform and env of the source database? anyway, either the resolver is populated with search such as the one below or it will be empty
and here
self.schema_resolver.graph.get_urns_by_filter(
entity_types=["dataset"],
platform=platform,
platform_instance=self.schema_resolver.platform_instance,
env=self.schema_resolver.env,
)
* here we just use the `graph` object, not actually using the Schema Resolver itself
I have the feeling that the usage of the Schema Resolver is very residual and only related to use its `DataHubGraph` object for doing the search.
Instead, I think it would be better to move down some of this pattern+search logic to the SchemaResolver itself. Or alternatively, just use the `DataHubGraph` object instead of making dependency with the SchemaResolver.
| # Filter by platform | ||
| if f"dataPlatform:{platform}" not in urn: | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldn't be necessary with the get_urns_by_filter search, no?
| # Filter by database - check if table_name starts with database prefix | ||
| if database_name: | ||
| if table_name.lower().startswith(f"{database_name.lower()}."): | ||
| # Remove database prefix to get "schema.table" | ||
| schema_table = table_name[len(database_name) + 1 :] | ||
| discovered_tables.append(schema_table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all this parsing in the URN.... I find it fragile
Couldn't we make a more specific search: eg search datasets in a given database (which should be a container)?
|
|
||
| # Build target URN using DatasetUrn helper with correct target platform | ||
| target_urn = DatasetUrn.create_from_ids( | ||
| platform_id=target_platform, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
platform instance?
|
|
||
| try: | ||
| # Get all URNs from schema resolver and filter for the source platform | ||
| # The cache may contain URNs from other platforms if shared across runs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean "shared accross runs"?
| if self.schema_resolver and self.schema_resolver.graph: | ||
| logger.info( | ||
| f"Kafka API unavailable for connector '{self.connector_manifest.name}' - " | ||
| f"querying DataHub for Kafka topics to expand pattern '{topics_regex}'" | ||
| ) | ||
| try: | ||
| # Query DataHub for all Kafka topics | ||
| kafka_topic_urns = list( | ||
| self.schema_resolver.graph.get_urns_by_filter( | ||
| platform="kafka", | ||
| env=self.schema_resolver.env, | ||
| entity_types=["dataset"], | ||
| ) | ||
| ) | ||
|
|
||
| datahub_topics = [] | ||
| for urn in kafka_topic_urns: | ||
| topic_name = self._extract_table_name_from_urn(urn) | ||
| if topic_name: | ||
| datahub_topics.append(topic_name) | ||
|
|
||
| matched_topics = matcher.filter_matches([topics_regex], datahub_topics) | ||
|
|
||
| logger.info( | ||
| f"Found {len(matched_topics)} Kafka topics in DataHub matching pattern '{topics_regex}' " | ||
| f"(out of {len(datahub_topics)} total Kafka topics)" | ||
| ) | ||
| return matched_topics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We claim here using SchemaResolver but we are just using the grahp object and we are not even updating back SchemaResolver cache
As per my understanding, we should refresh the SchemaResolver cache with the search.
And ideally, we could move some logic down to the SchemaResolver class.
In SchemaResolver we have
def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]: ...
maybe we could add some new method to resolve:
- urns and schemas for a given database
- or, urns and schemas for a given regexp pattern
I think pushing down this kind of functionality to SchemaResolver, that would simplify code in the source and would help to have better separation of responsibilities.
| from datahub.sql_parsing.schema_resolver import SchemaResolver | ||
|
|
||
| # Get platform from connector instance (single source of truth) | ||
| platform = connector.get_platform() | ||
|
|
||
| # Get platform instance if configured | ||
| platform_instance = get_platform_instance( | ||
| config, connector.connector_manifest.name, platform | ||
| ) | ||
|
|
||
| logger.info( | ||
| f"Creating SchemaResolver for connector {connector.connector_manifest.name} " | ||
| f"with platform={platform}, platform_instance={platform_instance}" | ||
| ) | ||
|
|
||
| return SchemaResolver( | ||
| platform=platform, | ||
| platform_instance=platform_instance, | ||
| env=config.env, | ||
| graph=ctx.graph, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in bigquery, we do
return self.ctx.graph.initialize_schema_resolver_from_datahub(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
batch_size=self.config.schema_resolution_batch_size,
)
which instantiates SchemaResolver + populates its cache
that would make sense here too, right?
have you considered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good abstractions here! ❤️
| ) | ||
|
|
||
|
|
||
| class TestParseCommaSeparatedList: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice tests covering all cases
wondering if parse_comma_separated_list may be moved to some utils class
| assert parse_comma_separated_list(input_str) == items | ||
|
|
||
|
|
||
| class TestConnectorConfigKeys: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering the value of testing constants 😅
sgomezvillamor
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I haven't reviewed all files in detail, overall looks pretty good
I haven't been able to check coverage in codecov, just may be high given the amount of tests
My concern is on the usage of SchemaResolver. We claim using it even in the user docs and this is reflected also in the configs, however the usage seems mostly about using the graph object in the SchemaResolver.
- we never refresh (or I missed it!) the internal cache of SchemaResolver (the caching would be one of the reasons to use SchemaResolver)
- and we still do a lot of "resolution" logic out of the SchemaResolver
Summary
This PR adds automatic lineage inference from DataHub to the Kafka Connect source connector. Instead of relying solely on connector manifests, the ingestion can now query DataHub's metadata graph to resolve schemas and generate both table-level and column-level lineage.
Motivation
Currently, Kafka Connect lineage extraction is limited by what's explicitly declared in connector configurations. This PR enables:
table.include.list: "database.*"can now be resolved to actual table names by querying DataHubChanges
New Configuration Options
Added three new configuration fields to
KafkaConnectSourceConfig:Auto-Enable for Confluent Cloud
New behavior:
use_schema_resolveris automatically enabled when Confluent Cloud is detected via:confluent_cloud_environment_id+confluent_cloud_cluster_idconfigurationapi.confluent.cloud/connect/v1/)Users can opt-out by explicitly setting
use_schema_resolver: false.Core Components
SchemaResolver Integration (
connector_registry.py):create_schema_resolver()method to instantiate schema resolvers with platform-specific configurationsFine-Grained Lineage Extraction (
common.py):_extract_fine_grained_lineage()method inBaseConnectorFineGrainedLineageClassinstances for column-level lineageEnhanced Source Connectors (
source_connectors.py):ANALYTICS.PUBLIC.*→ actual tables)Pattern Matching (
pattern_matchers.py):database.*,schema.table*, etc.)Configuration Constants (
config_constants.py):Improved Topic Handling:
Code Quality Improvements
Usage
OSS Kafka Connect (Default Behavior)
Confluent Cloud (Auto-Enabled)
Confluent Cloud (Opt-Out)
OSS with Schema Resolver (Explicit Enable)
Testing
Test modules:
test_kafka_connect.py- Core connector teststest_kafka_connect_config_validation.py- Auto-enable logic tests (8 new tests)test_kafka_connect_schema_resolver.py- Schema resolver integrationtest_kafka_connect_snowflake_source.py- Snowflake connector teststest_kafka_connect_pattern_matchers.py- Pattern matching teststest_kafka_connect_config_constants.py- Configuration validationtest_kafka_connect_connector_registry.py- Connector registration testsDocumentation
docs/sources/kafka-connect/kafka-connect.mdwith:Breaking Changes
None - All features are opt-in (or auto-enabled only for Confluent Cloud). Existing Kafka Connect ingestions continue to work unchanged.
Default behavior:
use_schema_resolver: false(unchanged behavior)use_schema_resolver: true(auto-enabled, can be disabled)Prerequisites for Schema Resolver
IMPORTANT: For schema resolver to work, source database tables must be ingested into DataHub before running Kafka Connect ingestion. Without prior database ingestion, schema resolver will not find table metadata.
Recommended ingestion order:
🤖 Generated with Claude Code
Co-Authored-By: Claude [email protected]